import numpy as np
from spatio_flux import SpatioFluxVivarium, build_path, render_path
Make a Vivarium¶
view available types and processes
# TODO -- make a spatio-flux specific Vivarium with the the core preloaded
vi = SpatioFluxVivarium()
# view the available types
vi.get_types()
['', 'maybe', 'length^2*mass/substance*temperature*time^2', 'enum', 'tree', 'length/mass', 'mass^0_5/length^1_5', 'protocol', 'boolean', 'path', 'length*temperature', 'substance', 'current*length^2*time', 'process', 'length^1_5*mass^0_5/time', 'length^2*mass/current*time^3', 'mass/temperature^4*time^3', 'substance/length^3', 'string', 'kinetics', 'time', 'number', 'current^2*time^4/length^2*mass', 'quote', '/substance', 'current^2*time^3/length^2*mass', 'integer', 'substance/time', 'current*time^2/length^2*mass', 'length', 'length^1_5*mass^0_5/time^2', 'luminosity', 'tuple', 'union', 'time/length', 'interval', 'mass', 'printing_unit/length', 'luminosity/length^2', 'emitter_mode', 'current*length^2', 'map', 'length*time/mass', 'mass/length', 'length^3', 'substrate_role', 'length^2*mass/time', '/temperature*time', 'length^2*mass/current^2*time^2', 'mass^0_5/length^0_5*time', 'reaction', 'length/time^2', 'boundary_side', 'current*time', 'length^2*mass/time^2', 'length*mass/current*time^3', 'length^2*mass/current^2*time^3', 'printing_unit', 'length*mass/time^2', 'length^2/time^2', 'length^4*mass/time^3', 'current*time/mass', 'length^3*mass/current^2*time^4', 'list', 'time^2/length', '/time', 'length/time', 'length^2', 'array', 'current*time/substance', '/length', 'mass/time^2', 'mass/time^3', '/printing_unit', 'length^0_5*mass^0_5/time', 'length^2*mass/temperature*time^2', 'step', 'wires', 'float', 'temperature', 'length^2/time', 'length*mass/current^2*time^2', 'any', 'mass/length^3', 'current*length*time', 'schema', 'current', 'length^3/time', 'mass/length*time', 'mass/current*time^2', 'bounds', 'length^2*mass/current*time^2', 'particle', 'length^0_5*mass^0_5', 'length^4*mass/current*time^3', 'current^2*time^4/length^3*mass', 'length^2*mass/time^3', 'mass/length*time^2', 'edge', 'positive_float', 'length^3/mass*time^2', 'composite']
# view the available processes
vi.get_processes()
['Particles', 'MinimalParticle', 'DynamicFBA', 'ram-emitter', 'DiffusionAdvection', 'console-emitter', 'json-emitter', 'composite']
dFBA¶
Dynamic Flux Balance Analysis (dFBA) extends traditional Flux Balance Analysis (FBA) to model the dynamic behavior of metabolic networks over time, allowing for the simulation of growth and substrate consumption in a changing environment.
# inspect the config schema for the 'increase' process
vi.process_config('DynamicFBA')
{'model_file': 'string',
'kinetic_params': 'map[tuple[float,float]]',
'substrate_update_reactions': 'map[string]',
'biomass_identifier': 'string',
'bounds': 'map[bounds]'}
vi.core.default(vi.process_config('DynamicFBA')) # TODO use default to
{'model_file': '',
'kinetic_params': {},
'substrate_update_reactions': {},
'biomass_identifier': '',
'bounds': {}}
dfba_config = vi.process_config('DynamicFBA', dataclass=True) # TODO -- make this work
Error finding process DynamicFBA: 'type_parameters'
# TODO - enable get_dataclass to work with the new process
# dfba_config = v.get_dataclass('DynamicFBA')
dfba_config = {
"model_file": "textbook",
"kinetic_params": {
"glucose": (0.5, 1),
"acetate": (0.5, 2)},
"substrate_update_reactions": {
"glucose": "EX_glc__D_e",
"acetate": "EX_ac_e"},
"biomass_identifier": "biomass",
"bounds": {
"EX_o2_e": {"lower": -2, "upper": None},
"ATPM": {"lower": 1, "upper": 1}
}
}
# make the vivarium
v = SpatioFluxVivarium()
# add a dynamic FBA process called 'dFBA'
v.add_process(name="dFBA",
process_id="DynamicFBA",
config=dfba_config,
)
v.diagram(dpi='70')
mol_ids = ["glucose", "acetate", "biomass"]
path=["fields"]
for mol_id in mol_ids:
v.add_object(
name=mol_id,
path=path,
value=np.random.rand()
)
v.connect_process(
name="dFBA",
inputs={
"substrates": {
mol_id: build_path(path, mol_id)
for mol_id in mol_ids}
},
outputs={
"substrates": {
mol_id: build_path(path, mol_id)
for mol_id in mol_ids}
}
)
# add an emitter to save results
v.add_emitter()
# TODO -- show_values does not work
v.diagram(dpi='70', show_values=True)
v.set_value(path = ['fields', 'glucose'], value=10)
v.set_value(path = ['fields', 'biomass'], value=0.1)
field = v.get_value(['fields'])
print(field)
{'glucose': 10, 'acetate': 0.6458215874293376, 'biomass': 0.1}
v.save(filename='dFBA_t0')
Saved file: out/dFBA_t0.json
# run the simulation for 10 time units
v.run(interval=60)
v.plot_timeseries(
subplot_size=(8, 3),
combined_vars=[
[
'/fields/glucose',
'/fields/acetate',
'/fields/biomass'
]
]
)
Spatial dFBA¶
mol_ids = ["glucose", "acetate", "biomass"]
path=["fields"]
rows = 2
columns = 1
# make the vivarium
v2 = SpatioFluxVivarium()
for mol_id in mol_ids:
v2.add_object(
name=mol_id,
path=path,
value=np.random.rand(rows, columns)
)
# add a dynamic FBA process at every location
for i in range(rows):
for j in range(columns):
dfba_name = f"dFBA[{i},{j}]"
# add a process for this location
v2.add_process(
name=dfba_name,
process_id="DynamicFBA",
config=dfba_config,
)
v2.connect_process(
name=dfba_name,
inputs={"substrates": {
mol_id: build_path(path, mol_id, i, j)
for mol_id in mol_ids}},
outputs={"substrates": {
mol_id: build_path(path, mol_id, i, j)
for mol_id in mol_ids}}
)
# add an emitter to save results
v2.add_emitter()
v2.diagram(dpi='70')
# change some initial values
v2.merge_value(path = ['fields', 'glucose', 0, 0], value=10.0)
v2.merge_value(path = ['fields', 'biomass', 0, 0], value=0.1)
field = v2.get_value(['fields'])
print(field)
{'glucose': array([[10. ],
[ 0.64587147]]), 'acetate': array([[0.99690796],
[0.99130859]]), 'biomass': array([[0.1 ],
[0.7568275]])}
v2.run(60)
v2.get_timeseries(as_dataframe=True)
| /global_time | /fields/glucose | /fields/acetate | /fields/biomass | |
|---|---|---|---|---|
| 0 | 0.0 | [[10.0], [0.6458714654580358]] | [[0.9969079641276027], [0.9913085887832321]] | [[0.1], [0.7568274982093428]] |
| 1 | 1.0 | [[9.904761904761905], [0.21928497639487077]] | [[1.009016016323692], [0.4955463158333449]] | [[0.1079432568708625], [0.8034372879187125]] |
| 2 | 2.0 | [[9.80200585245463], [0.0]] | [[1.0219919037998328], [0.0]] | [[0.11651530697082768], [0.8399907649281485]] |
| 3 | 3.0 | [[9.691145527078628], [0.0]] | [[1.0358867345493203], [0.0]] | [[0.12576552148631978], [0.8399907649281485]] |
| 4 | 4.0 | [[9.571550338512791], [0.0]] | [[1.0507521117125165], [0.0]] | [[0.13574706721093277], [0.8399907649281485]] |
| ... | ... | ... | ... | ... |
| 56 | 56.0 | [[0.0], [0.0]] | [[0.0], [0.0]] | [[0.9893401479522954], [0.8399907649281485]] |
| 57 | 57.0 | [[0.0], [0.0]] | [[0.0], [0.0]] | [[0.9893401479522954], [0.8399907649281485]] |
| 58 | 58.0 | [[0.0], [0.0]] | [[0.0], [0.0]] | [[0.9893401479522954], [0.8399907649281485]] |
| 59 | 59.0 | [[0.0], [0.0]] | [[0.0], [0.0]] | [[0.9893401479522954], [0.8399907649281485]] |
| 60 | 60.0 | [[0.0], [0.0]] | [[0.0], [0.0]] | [[0.9893401479522954], [0.8399907649281485]] |
61 rows × 4 columns
# get a list of all the paths so they can be plotted together in a single graph
all_paths = []
for i in range(rows):
for j in range(columns):
# get the paths for this location
location_path = []
for mol_id in mol_ids:
this_path = build_path(path, mol_id, i, j)
rendered_path = render_path(this_path)
location_path.append(rendered_path)
all_paths.append(location_path)
# print(all_paths)
v2.plot_timeseries(
subplot_size=(8, 3),
combined_vars=all_paths
)
v2.show_video()
Diffusion/Advection¶
This approach models the physical processes of diffusion and advection in two dimensions, providing a way to simulate how substances spread and are transported across a spatial domain, essential for understanding patterns of concentration over time and space.
vi.core.default(vi.process_config('DiffusionAdvection'))
{'n_bins': (0, 0),
'bounds': (0.0, 0.0),
'default_diffusion_rate': 0.1,
'default_diffusion_dt': 0.1,
'diffusion_coeffs': {},
'advection_coeffs': {}}
bounds = (10.0, 10.0)
n_bins = (10, 10)
mol_ids = [
'glucose',
'acetate',
'biomass'
]
diffusion_rate = 1e-1
diffusion_dt = 1e-1
advection_coeffs = {
'biomass': (0, -0.1)
}
path = ['fields']
v3 = SpatioFluxVivarium()
for mol_id in mol_ids:
v3.add_object(
name=mol_id,
path=path,
value=np.random.rand(n_bins[0], n_bins[1])
)
v3.add_process(name='diffusion_advection',
process_id='DiffusionAdvection',
config={
'n_bins': n_bins,
'bounds': bounds,
'default_diffusion_rate': diffusion_rate,
'default_diffusion_dt': diffusion_dt,
# 'diffusion_coeffs': diffusion_coeffs_all,
'advection_coeffs': advection_coeffs,
},
inputs={'fields': ['fields']},
outputs={'fields': ['fields']}
)
# add an emitter to save results
v3.add_emitter()
v3.diagram(dpi='70')
v3.run(60)
v3.plot_snapshots(n_snapshots=5)
v3.show_video()
COMETS (Computation Of Microbial Ecosystems in Time and Space)¶
COMETS combines dynamic FBA with spatially resolved physical processes (like diffusion and advection) to simulate the growth, metabolism, and interaction of microbial communities within a structured two-dimensional environment, capturing both biological and physical complexities.
bounds = (10.0, 10.0)
n_bins = (10, 10)
mol_ids = [
'glucose',
'acetate',
'biomass'
]
diffusion_rate = 1e-1
diffusion_dt = 1e-1
advection_coeffs = {
'biomass': (0, -0.1)
}
path = ['fields']
v4 = SpatioFluxVivarium()
# create the molecular fields
# for mol_id in mol_ids:
v4.add_object(
name='glucose',
path=path,
value=np.random.rand(n_bins[0], n_bins[1])
)
v4.add_object(
name='biomass',
path=path,
value=np.ones((n_bins[0], n_bins[1])) * 0.01
)
v4.add_object(
name='acetate',
path=path,
value=np.zeros((n_bins[0], n_bins[1]))
)
# add a diffusion/advection process
v4.add_process(name='diffusion_advection',
process_id='DiffusionAdvection',
config={
'n_bins': n_bins,
'bounds': bounds,
'default_diffusion_rate': diffusion_rate,
'default_diffusion_dt': diffusion_dt,
'advection_coeffs': advection_coeffs},
inputs={'fields': path},
outputs={'fields': path}
)
# add a dynamic FBA process at every location
for i in range(n_bins[0]):
for j in range(n_bins[1]):
dfba_name = f"dFBA[{i},{j}]"
# add a process for this location
v4.add_process(
name=dfba_name,
process_id="DynamicFBA",
config=dfba_config,
)
v4.connect_process(
name=dfba_name,
inputs={"substrates": {
mol_id: build_path(path, mol_id, i, j)
for mol_id in mol_ids}},
outputs={"substrates": {
mol_id: build_path(path, mol_id, i, j)
for mol_id in mol_ids}}
)
# add an emitter to save results
v4.add_emitter()
v4.diagram(dpi='70',
remove_nodes=[f"/dFBA[{i},{j}]" for i in range(n_bins[0]-1) for j in range(n_bins[1])]
)
v4.run(60)
v4.plot_snapshots(n_snapshots=5)
v4.show_video()
v4.plot_timeseries(
subplot_size=(8, 3),
query=[
'/fields/glucose/0/0',
'/fields/acetate/0/0',
'/fields/biomass/0/0',
],
combined_vars=[[
'/fields/glucose/0/0',
'/fields/acetate/0/0',
'/fields/biomass/0/0',
]]
)
Particles¶
# TODO -- remove this from final
import numpy as np
from spatio_flux import SpatioFluxVivarium
bounds = (10.0, 20.0) # Bounds of the environment
n_bins = (20, 40) # Number of bins in the x and y directions
v5 = SpatioFluxVivarium()
v5.add_process(
name='particle_movement',
process_id='Particles',
config={
'n_bins': n_bins,
'bounds': bounds,
'diffusion_rate': 0.1,
'advection_rate': (0, -0.1),
'add_probability': 0.4,
'boundary_to_add': ['top']
},
)
v5.connect_process(
name='particle_movement',
inputs={
'fields': ['fields'],
'particles': ['particles']
},
outputs={
'fields': ['fields'],
'particles': ['particles']
}
)
v5.initialize_process(
path='particle_movement',
config={'n_particles': 2}
)
v5.add_emitter()
v5.diagram(dpi='70')
v5.run(100)
v5_results = v5.get_results()
from spatio_flux.viz.plot import plot_species_distributions_with_particles_to_gif
# TODO -- integrate this method with vivarium
plot_species_distributions_with_particles_to_gif(
v5_results,
skip_frames=3,
bounds=bounds
)
Saving GIF to species_distribution_with_particles.gif
v5.diagram(dpi='70')
v5.save(filename='v5_post_run', outdir='out')
Saved file: out/v5_post_run.json
Minimal particle process¶
from spatio_flux.processes.particles import get_minimal_particle_composition
particle_schema = get_minimal_particle_composition(v5.core)
document = v5.make_document()
# document['state']['particles'] = {}
# # document['composition']['particles']
# document['composition'] = particle_schema
document['state']['global_time'] = 0.0
v6 = SpatioFluxVivarium(document=document)
v6.diagram(dpi='70')
v6.run(10)
v6_results = v6.get_results()
--------------------------------------------------------------------------- TypeError Traceback (most recent call last) Cell In[39], line 1 ----> 1 v6.run(10) 2 v6_results = v6.get_results() File ~/code/vivarium-interface/vivarium/vivarium.py:564, in Vivarium.run(self, interval) 561 if not self.emitter_paths: 562 self.add_emitter() --> 564 self.composite.run(interval) File ~/code/process-bigraph/process_bigraph/composite.py:783, in Composite.run(self, interval, force_complete) 781 for path in self.process_paths: 782 process = get_path(self.state, path) --> 783 full_step = self.run_process( 784 path, 785 process, 786 end_time, 787 full_step, 788 force_complete) 790 # apply updates based on process times in self.front 791 if full_step == math.inf: 792 # no processes ran, jump to next process File ~/code/process-bigraph/process_bigraph/composite.py:680, in Composite.run_process(self, path, process, end_time, full_step, force_complete) 677 full_step = interval 679 if future <= end_time: --> 680 update = self.process_update( 681 path, 682 process, 683 state, 684 process_interval 685 ) 687 # update front, to be applied at its projected time 688 self.front[path]['time'] = future File ~/code/process-bigraph/process_bigraph/composite.py:622, in Composite.process_update(self, path, process, states, interval, ports_key) 600 """Start generating a process's update. 601 602 This function is similar to :py:meth:`_invoke_process` except in (...) 617 ``store``. 618 """ 620 states = strip_schema_keys(states) --> 622 update = process['instance'].invoke(states, interval) 624 def defer_project(update, args): 625 schema, state, path = args File ~/code/process-bigraph/process_bigraph/composite.py:303, in Process.invoke(self, state, interval) 302 def invoke(self, state, interval): --> 303 update = self.update(state, interval) 304 sync = SyncUpdate(update) 305 return sync File ~/code/spatio-flux/spatio_flux/processes/particles.py:183, in Particles.update(self, state, interval) 180 # Apply diffusion and advection 181 dx, dy = np.random.normal(0, self.config['diffusion_rate'], 2) + self.config['advection_rate'] --> 183 new_x_position = particle['position'][0] + dx 184 new_y_position = particle['position'][1] + dy 186 # Check and remove particles if they hit specified boundaries TypeError: can only concatenate str (not "numpy.float64") to str
from spatio_flux.processes.particles import get_minimal_particle_composition
bounds = (10.0, 20.0) # Bounds of the environment
n_bins = (4, 8) # Number of bins in the x and y directions
# same as before
v6 = SpatioFluxVivarium()
v6.add_process(
name='particle_movement',
process_id='Particles',
config={
'n_bins': n_bins,
'bounds': bounds,
'diffusion_rate': 0.1,
'advection_rate': (0, -0.1),
'add_probability': 0.4,
'boundary_to_add': ['top']})
v6.connect_process(
name='particle_movement',
inputs={
'fields': ['fields'],
'particles': ['particles']},
outputs={
'fields': ['fields'],
'particles': ['particles']})
# Here we go beyond the previous particle simulation
# add a minimal particle process into the schema
particle_schema = get_minimal_particle_composition(v6.core)
v6.set_schema('particles', particle_schema['particles'])
v6.initialize_process(
path='particle_movement',
config={'n_particles': 1})
v6.diagram(dpi='70')
--------------------------------------------------------------------------- RecursionError Traceback (most recent call last) Cell In[23], line 30 27 # Here we go beyond the previous particle simulation 28 # add a minimal particle process into the schema 29 particle_schema = get_minimal_particle_composition(v6.core) ---> 30 v6.set_schema('particles', particle_schema['particles']) 32 v6.initialize_process( 33 path='particle_movement', 34 config={'n_particles': 1}) 36 v6.diagram(dpi='70') File ~/code/vivarium-interface/vivarium/vivarium.py:281, in Vivarium.set_schema(self, path, schema) 276 path = parse_path(path) 277 # TODO -- make this set the value in the composite using core 278 # set_path(self.composite.composition, path=path, value=schema) 279 280 # TODO -- need to regenerate the composition --> 281 self.composite.merge(schema, self.composite.state, path) File ~/code/process-bigraph/process_bigraph/composite.py:584, in Composite.merge(self, schema, state, path) 582 def merge(self, schema, state, path=None): 583 path = path or [] --> 584 self.composition, self.state = self.core.merge( 585 self.composition, 586 self.state, 587 path, 588 schema, 589 state) File ~/code/bigraph-schema/bigraph_schema/type_system.py:1308, in TypeSystem.merge(self, schema, state, path, update_schema, update_state, defer) 1299 def merge(self, schema, state, path, update_schema, update_state, defer=False): 1300 top_schema, top_state = self.set_slice( 1301 schema, 1302 state, (...) 1305 update_state, 1306 defer) -> 1308 return self.generate(top_schema, top_state) File ~/code/bigraph-schema/bigraph_schema/type_system.py:2092, in TypeSystem.generate(self, schema, state) 2091 def generate(self, schema, state): -> 2092 merged_schema, merged_state = self.sort( 2093 schema, 2094 state) 2096 _, _, top_schema, top_state = self.generate_recur( 2097 merged_schema, 2098 merged_state) 2100 return top_schema, top_state File ~/code/bigraph-schema/bigraph_schema/type_system.py:311, in TypeSystem.sort(self, schema, state) 304 schema = self.access(schema) 306 sort_function = self.choose_method( 307 schema, 308 state, 309 'sort') --> 311 return sort_function( 312 self, 313 schema, 314 state) File ~/code/bigraph-schema/bigraph_schema/type_functions.py:2317, in sort_any(core, schema, state) 2315 merged_schema[key] = schema[key] 2316 else: -> 2317 subschema, merged_state[key] = core.sort( 2318 schema.get(key, {}), 2319 state.get(key, None)) 2320 if subschema: 2321 merged_schema[key] = subschema File ~/code/bigraph-schema/bigraph_schema/type_system.py:311, in TypeSystem.sort(self, schema, state) 304 schema = self.access(schema) 306 sort_function = self.choose_method( 307 schema, 308 state, 309 'sort') --> 311 return sort_function( 312 self, 313 schema, 314 state) File ~/code/bigraph-schema/bigraph_schema/type_functions.py:2317, in sort_any(core, schema, state) 2315 merged_schema[key] = schema[key] 2316 else: -> 2317 subschema, merged_state[key] = core.sort( 2318 schema.get(key, {}), 2319 state.get(key, None)) 2320 if subschema: 2321 merged_schema[key] = subschema [... skipping similar frames: TypeSystem.sort at line 311 (1479 times), sort_any at line 2317 (1479 times)] File ~/code/bigraph-schema/bigraph_schema/type_system.py:311, in TypeSystem.sort(self, schema, state) 304 schema = self.access(schema) 306 sort_function = self.choose_method( 307 schema, 308 state, 309 'sort') --> 311 return sort_function( 312 self, 313 schema, 314 state) File ~/code/bigraph-schema/bigraph_schema/type_functions.py:2317, in sort_any(core, schema, state) 2315 merged_schema[key] = schema[key] 2316 else: -> 2317 subschema, merged_state[key] = core.sort( 2318 schema.get(key, {}), 2319 state.get(key, None)) 2320 if subschema: 2321 merged_schema[key] = subschema File ~/code/bigraph-schema/bigraph_schema/type_system.py:306, in TypeSystem.sort(self, schema, state) 303 def sort(self, schema, state): 304 schema = self.access(schema) --> 306 sort_function = self.choose_method( 307 schema, 308 state, 309 'sort') 311 return sort_function( 312 self, 313 schema, 314 state) File ~/code/bigraph-schema/bigraph_schema/type_system.py:742, in TypeSystem.choose_method(self, schema, state, method_name) 739 found = schema[method_key] 741 if found is None: --> 742 any_type = self.access('any') 743 found = any_type[method_key] 745 registry = self.lookup_registry(method_key) File ~/code/bigraph-schema/bigraph_schema/type_system.py:440, in TypeSystem.access(self, schema, strict) 438 def access(self, schema, strict=False): 439 if isinstance(schema, str): --> 440 return self.access_str(schema, strict=strict) 441 else: 442 return self.find(schema, strict=strict) RecursionError: maximum recursion depth exceeded in comparison
# v6
# particle_schema = get_minimal_particle_composition(v6.core)
# particle_schema
v6.run(10)
v6_results = v6.get_results()
from spatio_flux.viz.plot import plot_species_distributions_with_particles_to_gif
# TODO -- integrate this method with vivarium
plot_species_distributions_with_particles_to_gif(
v6_results,
skip_frames=2,
bounds=bounds
)
Saving GIF to species_distribution_with_particles.gif